package cn.doitedu.udfs;

import cn.doitedu.pojo.LogBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @create: 2021-11-02 20:14
 * @author: 静如海的男人
 * @program: Eagle
 * 将json字符串转成Bean对象,如果有有问题数据直接过滤
 * 使用ProcessFunction (NoKeyBy)
 **/

public class JsonToLogBeanFunction extends ProcessFunction<String, LogBean> {

    @Override
    public void processElement(String value, Context ctx, Collector<LogBean> out) throws Exception {
        try {
            LogBean dataBean = JSON.parseObject(value, LogBean.class);
            out.collect(dataBean);
        } catch (Exception e) {
           //TODO 将有问题的数据保存起来
        }
    }
}
